# This code tracks the status of credit card accounts that were current in
# Jan 2009 and noncurrent in Jun 2009. Output data file is at account level. 

#########################
###       Setup       ###
#########################
# Load packages

import pyspark.sql.functions as sqlf
import pandas as pd
import numpy  as np
import os
import sys
import shutil
from pyspark     import SparkContext, SparkConf
from pyspark.sql import SQLContext
from SparkDataTools2 import *

# Set spark working path
WORKDIR = '/geo_debt/geo/Spark'
CODEDIR = '/geo_debt/geo/Code/1_Spark/'
OUTDIR  = '/geo_debt/geo/Output/Table'


#cross walk
CW      = 'parquet.`/geo_debt/geo/cw/cw_zip_cz`'

#go to owrking directory
os.chdir(WORKDIR)


### some function for this specific exercise
def xxSelectItems(tradetbID, headID, loopNum):
    result = ""
    i = 1
    while i<=loopNum:
        # 99:dead; 0:tradeline dropped; 1:current; 2:noncurrent; UR_*: if UR, output * as the affiliateRemarkCode if it's not null; 77:UR and null affiliateRemarkCode; Any other MOP
        result = result + "CASE WHEN (" + headID + str(i) + ".subjectKey IS NULL) THEN '99' " +\
                              " WHEN (" + tradetbID + str(i) + ".subjectKey IS NULL AND " + tradetbID + str(i) + ".tradeKey IS NULL) THEN '0' " +\
                              " WHEN (" + tradetbID + str(i) + ".mannerOfPayment IN ('01',1)) THEN '1' " +\
                              " WHEN (" + tradetbID + str(i) + ".mannerOfPayment IN ('02','03','04','05','07','08','09','9B',2,3,4,5,7,8,9)) THEN '2' " +\
                              " WHEN (" + tradetbID + str(i) + ".mannerOfPayment IN ('UR') AND " + tradetbID + str(i) + ".affiliateRemarkCode IS NOT NULL ) THEN CONCAT('UR_'," + tradetbID + str(i) + ".affiliateRemarkCode) " +\
                              " WHEN (" + tradetbID + str(i) + ".mannerOfPayment IN ('UR') AND " + tradetbID + str(i) + ".affiliateRemarkCode IS NULL ) THEN '77' " +\
                              " ELSE " + tradetbID + str(i) + ".mannerOfPayment END AS cc_status" + str(i) +" "

        if i!=loopNum:
            result = result + ", "
        i = i+1
    return result

def xxJoinTable_subjectKey(join, tbName, tbID, loopNum):
    result = ""
    i = 1
    while i<=loopNum:
        result = result + " " + join + " " + tbName[i] + tbID + str(i) + " ON a.subjectKey=" + tbID + str(i) + ".subjectKey "
        i = i+1
    return result

def xxJoinTable_subANDtradeKey(join, tbName, tbID, loopNum):
    result = ""
    i = 1
    while i<=loopNum:
        result = result + " " + join + " " + tbName[i] + tbID + str(i) + " ON a.subjectKey=" + tbID + str(i) + ".subjectKey AND " + "a.tradeKey=" + tbID + str(i) + ".tradeKey "
        i = i+1
    return result

#########################
###       Main        ###
#########################

startY = 2009
startM = 6

y = startY
m = startM
headersDataST = table_setup(sqlContext, y, m, 'headers')
headersDataST.createOrReplaceTempView("headersDataST")

tradesDataST = table_setup(sqlContext, y, m, 'trades')
tradesDataST.createOrReplaceTempView("tradesDataST")

lastHY = 2009
lastHM = 1
# if lastHM <= 0:
#     lastHY = lastHY - 1
#     lastHM = lastHM + 12
# halfYBefore = sql_date(lastHY,lastHM)
tradesDataST0 = table_setup(sqlContext, lastHY, lastHM, 'trades')
tradesDataST0.createOrReplaceTempView("tradesDataST0")

# require that the account is current in Jan 2009
pre_trade = "SELECT subjectKey, tradeKey FROM tradesDataST0 WHERE mannerOfPayment IN ('01',1) AND assetClassTag = 'CRED' GROUP BY subjectKey, tradeKey "

# require that the account is noncurrent in Jun 2009
start_trade = "SELECT t.subjectKey, t.tradeKey " +\
              "FROM tradesDataST t "+\
              "INNER JOIN headersDataST h ON t.subjectKey=h.subjectKey " +\
              "INNER JOIN (" + pre_trade + ") t0 ON t.subjectKey=t0.subjectKey AND t.tradeKey=t0.tradeKey " +\
              "WHERE h.birthDate IS NOT NULL AND FLOOR((h.asOfDate - h.birthDate)/10000) BETWEEN 20 AND 80 " +\
              "AND t.mannerOfPayment IN ('02','03','04','05','07','08','09','9B',2,3,4,5,7,8,9) " +\
              "AND t.assetClassTag = 'CRED' " +\
              "GROUP BY t.subjectKey, t.tradeKey "


trade_table_list = dict()
header_table_list = dict()
trade_filter = dict()
header_filter = dict()
for i in range(1,17):
    tmp_y = y
    tmp_m = m+6*i
    while tmp_m>12:
        tmp_m = tmp_m - 12
        tmp_y = tmp_y + 1
    while tmp_m<=0:
        tmp_m = tmp_m + 12
        tmp_y = tmp_y - 1
    ym = yyyymm(tmp_y,tmp_m)
    print("^^^ "+ym+" ^^^")
    trade_table_list[i] = table_setup(sqlContext, tmp_y, tmp_m, 'trades')
    trade_table_list[i].createOrReplaceTempView("trade_table_list"+str(i))

    trade_filter[i] = "(SELECT subjectKey, tradeKey, FIRST(mannerOfPayment) AS mannerOfPayment, FIRST(affiliateRemarkCode) AS affiliateRemarkCode "+\
                      " FROM trade_table_list"+str(i) +" WHERE assetClassTag = 'CRED' GROUP BY subjectKey, tradeKey) "

    header_table_list[i] = table_setup(sqlContext, tmp_y, tmp_m, 'headers')
    header_table_list[i].createOrReplaceTempView("header_table_list"+str(i))

    header_filter[i] = "(SELECT subjectKey FROM header_table_list"+str(i)+") "

sql = "SELECT a.subjectKey, " + xxSelectItems("t","h",16) + " FROM (" +\
            start_trade + ") a " +\
            xxJoinTable_subANDtradeKey("LEFT JOIN", trade_filter, "t", 16) +\
            xxJoinTable_subjectKey("LEFT JOIN", header_filter, "h", 16)

ym = yyyymm(y,m)
saveStata(sql,'cc_decay_'+ym, WORKDIR, OUTDIR)


sqlContext.clearCache()

print(' !!!! SUCCESS !!!!')
